package com.nstskj.study.netty.tcp.netty.protocol.executor.msg.impl;

import com.nstskj.study.netty.tcp.netty.config.properties.NettyNetDevTcpServerProperties;
import com.nstskj.study.netty.tcp.netty.config.properties.ThreadExecutionProperties;
import com.nstskj.study.netty.tcp.netty.protocol.common.ProtocolTuple;
import com.nstskj.study.netty.tcp.netty.protocol.event.CloseNetDeviceEvent;
import com.nstskj.study.netty.tcp.netty.protocol.event.CreateNetDeviceEvent;
import com.nstskj.study.netty.tcp.netty.protocol.event.base.AbstractNetDeviceEvent;
import com.nstskj.study.netty.tcp.netty.protocol.executor.AbstractWorkExecutor;
import com.nstskj.study.netty.tcp.netty.protocol.executor.msg.ProtocolHandlerWorkExecutor;
import com.nstskj.study.netty.tcp.netty.protocol.executor.work.AbstractTcpProtocolWork;
import com.nstskj.study.netty.tcp.netty.protocol.handler.msg.ProtocolHandler;
import com.nstskj.study.netty.tcp.netty.protocol.handler.msg.strategy.ProtocolHandlerStrategy;
import com.nstskj.study.netty.tcp.netty.protocol.message.in.msg.base.AbstractInputNetTcpMessage;
import com.nstskj.study.netty.tcp.netty.protocol.message.out.msg.base.AbstractOutputNetTcpMessage;
import com.nstskj.study.netty.tcp.netty.protocol.session.base.AbstractSession;
import com.nstskj.study.netty.tcp.netty.protocol.session.enums.SessionStatusEnums;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

/**
 * @author ZhouChuGang
 * @version 1.0
 * @project nstskj-study-netty-tcp-spring
 * @date 2021/4/21 22:03
 * @Description
 */
@Slf4j
@Component
public class SpringProtocolHandlerWorkExecutorImpl extends AbstractWorkExecutor implements ProtocolHandlerWorkExecutor, ApplicationListener<AbstractNetDeviceEvent> {

    @Autowired
    private ProtocolHandlerStrategy protocolHandlerStrategy;

    /**
     * 会话的工作线程
     */
    private final Map<String, ProtocolTuple<ProtocolRecMsgHandlerWork, ProtocolSendMsgHandlerWork>> handlerWorkMap = new ConcurrentHashMap<>();

    /**
     * 得到对应的线程池配置参数
     *
     * @param nettyNetDevTcpServerProperties
     * @return
     */
    @Override
    protected ThreadExecutionProperties getThreadExecutionProperties(NettyNetDevTcpServerProperties nettyNetDevTcpServerProperties) {
        return nettyNetDevTcpServerProperties.getProtocolHandlerWorkExecutor();
    }

    /**
     * 提交执行消息任务
     *
     * @param sessionId
     * @param abstractInputNetTcpMessage
     */
    @Override
    public boolean submit(String sessionId, AbstractInputNetTcpMessage abstractInputNetTcpMessage) {
        ProtocolTuple<ProtocolRecMsgHandlerWork, ProtocolSendMsgHandlerWork> sessionWorkHandleTuple = handlerWorkMap.get(sessionId);
        //这里是不应该出现的，因为在连接的时候就已经创建了工作线程
        if (Objects.isNull(sessionWorkHandleTuple)) {
            return false;
        }
        //写入队列
        return sessionWorkHandleTuple.getOne().getBlockingQueue().offer(abstractInputNetTcpMessage);
    }

    /**
     * 收到连接事件，则为新的session分配一个工作者线程
     *
     * @param netDeviceEvent
     */
    @Override
    public void onApplicationEvent(AbstractNetDeviceEvent netDeviceEvent) {
        AbstractSession abstractSession = netDeviceEvent.getAbstractSession();
        String sessionId = abstractSession.getSessionId();
        if (netDeviceEvent instanceof CreateNetDeviceEvent) {
            //如果为创建事件，则创建工作线程
            ProtocolRecMsgHandlerWork protocolRecMsgHandlerWork = new ProtocolRecMsgHandlerWork(abstractSession.getRecMessageBlockingQueue(), abstractSession, "cmdRecHandler");
            ProtocolSendMsgHandlerWork protocolSendMsgHandlerWork = new ProtocolSendMsgHandlerWork(abstractSession.getSendMessageBlockingQueue(), abstractSession, "cmdsendHandler");
            ProtocolTuple<ProtocolRecMsgHandlerWork, ProtocolSendMsgHandlerWork> sessionWorkHandleTuple = new ProtocolTuple<>(protocolRecMsgHandlerWork, protocolSendMsgHandlerWork);
            this.handlerWorkMap.put(sessionId, sessionWorkHandleTuple);
            //提交线程
            this.getThreadPoolExecutor().execute(protocolRecMsgHandlerWork);
            this.getThreadPoolExecutor().execute(protocolSendMsgHandlerWork);
        } else if (netDeviceEvent instanceof CloseNetDeviceEvent) {
            ProtocolTuple<ProtocolRecMsgHandlerWork, ProtocolSendMsgHandlerWork> sessionWorkHandleTuple = handlerWorkMap.remove(sessionId);
            if (Objects.nonNull(sessionWorkHandleTuple)) {
                //停止
                sessionWorkHandleTuple.getOne().stop();
                sessionWorkHandleTuple.getTwo().stop();
            }
        }
    }

    /**
     * 会话的 协议接收消息执行者线程
     */
    private class ProtocolRecMsgHandlerWork extends AbstractTcpProtocolWork<AbstractInputNetTcpMessage> {

        /**
         * 构造 默认不执行
         *
         * @param blockingQueue   堵塞队列
         * @param abstractSession session
         * @param workName        工作线程名字，用于打印
         */
        public ProtocolRecMsgHandlerWork(BlockingQueue<AbstractInputNetTcpMessage> blockingQueue, AbstractSession abstractSession, String workName) {
            super(blockingQueue, abstractSession, workName);
        }

        /**
         * 收到堵塞队列的数据后的处理
         *
         * @param abstractInputNetTcpMessage 队列中的元素
         * @param abstractSession            当前处理的会话
         * @throws Exception
         */
        @Override
        public void doHandlerWork(AbstractInputNetTcpMessage abstractInputNetTcpMessage, AbstractSession abstractSession) throws Exception {
            long begin = System.nanoTime();
            ProtocolHandler protocolHandler = protocolHandlerStrategy.getProtocolHandler(abstractInputNetTcpMessage.getCmd());
            if (protocolHandler != null) {
                //执行处理方法 ，得到响应
                AbstractOutputNetTcpMessage abstractOutputNetTcpMessage = protocolHandler.handlerCmdMsg(abstractInputNetTcpMessage);
                if (abstractOutputNetTcpMessage != null) {
                    if (SessionStatusEnums.RUNING.equals(abstractSession.getStatus())) {
                        //发送响应
                        abstractSession.addSendNetTcpMessage(abstractOutputNetTcpMessage);
                    }
                }
            }
            long end = System.nanoTime();

            log.info(" sessionId {} cmd {} 执行时间 {} us", abstractSession.getSessionId(), abstractInputNetTcpMessage.getCmd(), TimeUnit.NANOSECONDS.toMicros(end - begin));
        }
    }


    /**
     * 会话的发送消息执行者线程
     */
    private class ProtocolSendMsgHandlerWork extends AbstractTcpProtocolWork<AbstractOutputNetTcpMessage> {

        /**
         * 构造 默认不执行
         *
         * @param blockingQueue   堵塞队列
         * @param abstractSession session
         * @param workName        工作线程名字，用于打印
         */
        public ProtocolSendMsgHandlerWork(BlockingQueue<AbstractOutputNetTcpMessage> blockingQueue, AbstractSession abstractSession, String workName) {
            super(blockingQueue, abstractSession, workName);
        }

        /**
         * 收到堵塞队列的数据后的处理
         *
         * @param abstractOutputNetTcpMessage 队列中的元素
         * @param abstractSession             当前处理的会话
         * @throws Exception
         */
        @Override
        public void doHandlerWork(AbstractOutputNetTcpMessage abstractOutputNetTcpMessage, AbstractSession abstractSession) throws Exception {
            //把数据发送到隧道
            abstractSession.sendOutputNetTcpMessage(abstractOutputNetTcpMessage);
        }
    }

}
